%%%-------------------------------------------------------------------
%%% File    : mod_mam.erl
%%% Author  : Evgeniy Khramtsov <ekhramtsov@process-one.net>
%%% Purpose : Message Archive Management (XEP-0313)
%%% Created :  4 Jul 2013 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2013-2022   ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%-------------------------------------------------------------------

-module(mod_mam).

-protocol({xep, 313, '0.6.1'}).
-protocol({xep, 334, '0.2'}).
-protocol({xep, 359, '0.5.0'}).
-protocol({xep, 441, '0.2.0'}).

-behaviour(gen_mod).

%% API
-export([start/2, stop/1, reload/3, depends/2, mod_doc/0]).

-export([sm_receive_packet/1, user_receive_packet/1, user_send_packet/1,
	 user_send_packet_strip_tag/1, process_iq_v0_2/1, process_iq_v0_3/1,
	 disco_sm_features/5, remove_user/2, remove_room/3, mod_opt_type/1,
	 muc_process_iq/2, muc_filter_message/3, message_is_archived/3,
	 delete_old_messages/2, get_commands_spec/0, msg_to_el/4,
	 get_room_config/4, set_room_option/3, offline_message/1, export/1,
	 mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
	 is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
	 process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7,
	 delete_old_messages_batch/5, delete_old_messages_status/1, delete_old_messages_abort/1]).

-include_lib("xmpp/include/xmpp.hrl").
-include("logger.hrl").
-include("mod_muc_room.hrl").
-include("ejabberd_commands.hrl").
-include("mod_mam.hrl").
-include("translate.hrl").

-define(DEF_PAGE_SIZE, 50).
-define(MAX_PAGE_SIZE, 250).

-type c2s_state() :: ejabberd_c2s:state().
-type count() :: non_neg_integer() | undefined.

-callback init(binary(), gen_mod:opts()) -> any().
-callback remove_user(binary(), binary()) -> any().
-callback remove_room(binary(), binary(), binary()) -> any().
-callback delete_old_messages(binary() | global,
			      erlang:timestamp(),
			      all | chat | groupchat) -> any().
-callback extended_fields() -> [mam_query:property() | #xdata_field{}].
-callback store(xmlel(), binary(), {binary(), binary()}, chat | groupchat,
		jid(), binary(), recv | send, integer()) -> ok | any().
-callback write_prefs(binary(), binary(), #archive_prefs{}, binary()) -> ok | any().
-callback get_prefs(binary(), binary()) -> {ok, #archive_prefs{}} | error | {error, db_failure}.
-callback select(binary(), jid(), jid(), mam_query:result(),
		 #rsm_set{} | undefined, chat | groupchat) ->
    {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
    {error, db_failure}.
-callback select(binary(), jid(), jid(), mam_query:result(),
		 #rsm_set{} | undefined, chat | groupchat,
		 all | only_count | only_messages) ->
		    {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
		    {error, db_failure}.
-callback use_cache(binary()) -> boolean().
-callback cache_nodes(binary()) -> [node()].
-callback remove_from_archive(binary(), binary(), jid() | none) -> ok | {error, any()}.
-callback is_empty_for_user(binary(), binary()) -> boolean().
-callback is_empty_for_room(binary(), binary(), binary()) -> boolean().
-callback select_with_mucsub(binary(), jid(), jid(), mam_query:result(),
			     #rsm_set{} | undefined, all | only_count | only_messages) ->
    {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
    {error, db_failure}.

-callback delete_old_messages_batch(binary(), erlang:timestamp(),
				    all | chat | groupchat,
				    pos_integer()) ->
    {ok, non_neg_integer()} | {error, term()}.

-callback delete_old_messages_batch(binary(), erlang:timestamp(),
				    all | chat | groupchat,
				    pos_integer(), any()) ->
    {ok, any(), non_neg_integer()} | {error, term()}.

-optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/6, select/6, select/7,
    delete_old_messages_batch/5, delete_old_messages_batch/4]).

%%%===================================================================
%%% API
%%%===================================================================
start(Host, Opts) ->
    case mod_mam_opt:db_type(Opts) of
	mnesia ->
	    ?WARNING_MSG("Mnesia backend for ~ts is not recommended: "
			 "it's limited to 2GB and often gets corrupted "
			 "when reaching this limit. SQL backend is "
			 "recommended. Namely, for small servers SQLite "
			 "is a preferred choice because it's very easy "
			 "to configure.", [?MODULE]);
	_ ->
	    ok
    end,
    Mod = gen_mod:db_mod(Opts, ?MODULE),
    case Mod:init(Host, Opts) of
	ok ->
	    init_cache(Mod, Host, Opts),
	    register_iq_handlers(Host),
	    ejabberd_hooks:add(sm_receive_packet, Host, ?MODULE,
			       sm_receive_packet, 50),
	    ejabberd_hooks:add(user_receive_packet, Host, ?MODULE,
			       user_receive_packet, 88),
	    ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
			       user_send_packet, 88),
	    ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
			       user_send_packet_strip_tag, 500),
	    ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
			       offline_message, 49),
	    ejabberd_hooks:add(muc_filter_message, Host, ?MODULE,
			       muc_filter_message, 50),
	    ejabberd_hooks:add(muc_process_iq, Host, ?MODULE,
			       muc_process_iq, 50),
	    ejabberd_hooks:add(disco_sm_features, Host, ?MODULE,
			       disco_sm_features, 50),
	    ejabberd_hooks:add(remove_user, Host, ?MODULE,
			       remove_user, 50),
	    ejabberd_hooks:add(get_room_config, Host, ?MODULE,
			       get_room_config, 50),
	    ejabberd_hooks:add(set_room_option, Host, ?MODULE,
			       set_room_option, 50),
	    ejabberd_hooks:add(store_mam_message, Host, ?MODULE,
			       store_mam_message, 100),
	    case mod_mam_opt:assume_mam_usage(Opts) of
		true ->
		    ejabberd_hooks:add(message_is_archived, Host, ?MODULE,
				       message_is_archived, 50);
		false ->
		    ok
	    end,
	    case mod_mam_opt:clear_archive_on_room_destroy(Opts) of
		true ->
		    ejabberd_hooks:add(remove_room, Host, ?MODULE,
				       remove_room, 50);
		false ->
		    ejabberd_hooks:add(check_create_room, Host, ?MODULE,
				       check_create_room, 50)
	    end,
	    ejabberd_commands:register_commands(?MODULE, get_commands_spec()),
	    ok;
	Err ->
	    Err
    end.

use_cache(Mod, Host) ->
    case erlang:function_exported(Mod, use_cache, 2) of
	true -> Mod:use_cache(Host);
	false -> mod_mam_opt:use_cache(Host)
    end.

cache_nodes(Mod, Host) ->
    case erlang:function_exported(Mod, cache_nodes, 1) of
	true -> Mod:cache_nodes(Host);
	false -> ejabberd_cluster:get_nodes()
    end.

init_cache(Mod, Host, Opts) ->
    case use_cache(Mod, Host) of
	true ->
	    ets_cache:new(archive_prefs_cache, cache_opts(Opts));
	false ->
	    ets_cache:delete(archive_prefs_cache)
    end.

cache_opts(Opts) ->
    MaxSize = mod_mam_opt:cache_size(Opts),
    CacheMissed = mod_mam_opt:cache_missed(Opts),
    LifeTime = mod_mam_opt:cache_life_time(Opts),
    [{max_size, MaxSize}, {life_time, LifeTime}, {cache_missed, CacheMissed}].

stop(Host) ->
    unregister_iq_handlers(Host),
    ejabberd_hooks:delete(sm_receive_packet, Host, ?MODULE,
			  sm_receive_packet, 50),
    ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE,
			  user_receive_packet, 88),
    ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
			  user_send_packet, 88),
    ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
			  user_send_packet_strip_tag, 500),
    ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE,
			  offline_message, 49),
    ejabberd_hooks:delete(muc_filter_message, Host, ?MODULE,
			  muc_filter_message, 50),
    ejabberd_hooks:delete(muc_process_iq, Host, ?MODULE,
			  muc_process_iq, 50),
    ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE,
			  disco_sm_features, 50),
    ejabberd_hooks:delete(remove_user, Host, ?MODULE,
			  remove_user, 50),
    ejabberd_hooks:delete(get_room_config, Host, ?MODULE,
			  get_room_config, 50),
    ejabberd_hooks:delete(set_room_option, Host, ?MODULE,
			  set_room_option, 50),
    ejabberd_hooks:delete(store_mam_message, Host, ?MODULE,
			  store_mam_message, 100),
    case mod_mam_opt:assume_mam_usage(Host) of
	true ->
	    ejabberd_hooks:delete(message_is_archived, Host, ?MODULE,
				  message_is_archived, 50);
	false ->
	    ok
    end,
    case mod_mam_opt:clear_archive_on_room_destroy(Host) of
	true ->
	    ejabberd_hooks:delete(remove_room, Host, ?MODULE,
				  remove_room, 50);
	false ->
	    ejabberd_hooks:delete(check_create_room, Host, ?MODULE,
				  check_create_room, 50)
    end,
    case gen_mod:is_loaded_elsewhere(Host, ?MODULE) of
        false ->
            ejabberd_commands:unregister_commands(get_commands_spec());
        true ->
            ok
    end.

reload(Host, NewOpts, OldOpts) ->
    NewMod = gen_mod:db_mod(NewOpts, ?MODULE),
    OldMod = gen_mod:db_mod(OldOpts, ?MODULE),
    if NewMod /= OldMod ->
	    NewMod:init(Host, NewOpts);
       true ->
	    ok
    end,
    init_cache(NewMod, Host, NewOpts),
    case {mod_mam_opt:assume_mam_usage(NewOpts),
	  mod_mam_opt:assume_mam_usage(OldOpts)} of
	{true, false} ->
	    ejabberd_hooks:add(message_is_archived, Host, ?MODULE,
			       message_is_archived, 50);
	{false, true} ->
	    ejabberd_hooks:delete(message_is_archived, Host, ?MODULE,
				  message_is_archived, 50);
	_ ->
	    ok
    end.

depends(_Host, _Opts) ->
    [].

-spec register_iq_handlers(binary()) -> ok.
register_iq_handlers(Host) ->
    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MAM_TMP,
				  ?MODULE, process_iq_v0_2),
    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM_TMP,
				  ?MODULE, process_iq_v0_2),
    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MAM_0,
				  ?MODULE, process_iq_v0_3),
    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM_0, ?MODULE,
				  process_iq_v0_3),
    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MAM_1,
				  ?MODULE, process_iq_v0_3),
    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM_1,
				  ?MODULE, process_iq_v0_3),
    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MAM_2,
				  ?MODULE, process_iq_v0_3),
    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_MAM_2,
				  ?MODULE, process_iq_v0_3).

-spec unregister_iq_handlers(binary()) -> ok.
unregister_iq_handlers(Host) ->
    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_TMP),
    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_TMP),
    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_0),
    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_0),
    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_1),
    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_1),
    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_2),
    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_2).

-spec remove_user(binary(), binary()) -> ok.
remove_user(User, Server) ->
    LUser = jid:nodeprep(User),
    LServer = jid:nameprep(Server),
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    Mod:remove_user(LUser, LServer),
    case use_cache(Mod, LServer) of
	true ->
	    ets_cache:delete(archive_prefs_cache, {LUser, LServer},
			     cache_nodes(Mod, LServer));
	false ->
	    ok
    end.

-spec remove_room(binary(), binary(), binary()) -> ok.
remove_room(LServer, Name, Host) ->
    LName = jid:nodeprep(Name),
    LHost = jid:nameprep(Host),
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    Mod:remove_room(LServer, LName, LHost),
    ok.

-spec remove_mam_for_user(binary(), binary()) ->
    {ok, binary()} | {error, binary()}.
remove_mam_for_user(User, Server) ->
    LUser = jid:nodeprep(User),
    LServer = jid:nameprep(Server),
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    case Mod:remove_from_archive(LUser, LServer, none) of
	ok ->
	    {ok, <<"MAM archive removed">>};
	{error, Bin} when is_binary(Bin) ->
	    {error, Bin};
	{error, _} ->
	    {error, <<"Db returned error">>}
    end.

-spec remove_mam_for_user_with_peer(binary(), binary(), binary()) ->
    {ok, binary()} | {error, binary()}.
remove_mam_for_user_with_peer(User, Server, Peer) ->
    LUser = jid:nodeprep(User),
    LServer = jid:nameprep(Server),
    try jid:decode(Peer) of
	Jid ->
	    Mod = get_module_host(LServer),
	    case Mod:remove_from_archive(LUser, LServer, Jid) of
		ok ->
		    {ok, <<"MAM archive removed">>};
		{error, Bin} when is_binary(Bin) ->
		    {error, Bin};
		{error, _} ->
		    {error, <<"Db returned error">>}
	    end
    catch _:_ ->
	{error, <<"Invalid peer JID">>}
    end.

get_module_host(LServer) ->
    try gen_mod:db_mod(LServer, ?MODULE)
    catch error:{module_not_loaded, ?MODULE, LServer} ->
        gen_mod:db_mod(ejabberd_router:host_of_route(LServer), ?MODULE)
    end.

-spec get_room_config([muc_roomconfig:property()], mod_muc_room:state(),
		      jid(), binary()) -> [muc_roomconfig:property()].
get_room_config(Fields, RoomState, _From, _Lang) ->
    Config = RoomState#state.config,
    Fields ++ [{mam, Config#config.mam}].

-spec set_room_option({pos_integer(), _}, muc_roomconfig:property(), binary())
      -> {pos_integer(), _}.
set_room_option(_Acc, {mam, Val}, _Lang) ->
    {#config.mam, Val};
set_room_option(Acc, _Property, _Lang) ->
    Acc.

-spec sm_receive_packet(stanza()) -> stanza().
sm_receive_packet(#message{to = #jid{lserver = LServer}} = Pkt) ->
    init_stanza_id(Pkt, LServer);
sm_receive_packet(Acc) ->
    Acc.

-spec user_receive_packet({stanza(), c2s_state()}) -> {stanza(), c2s_state()}.
user_receive_packet({#message{from = Peer} = Pkt, #{jid := JID} = C2SState}) ->
    LUser = JID#jid.luser,
    LServer = JID#jid.lserver,
    Pkt1 = case should_archive(Pkt, LServer) of
	       true ->
		   case store_msg(Pkt, LUser, LServer, Peer, recv) of
		       ok ->
			   mark_stored_msg(Pkt, JID);
		       _ ->
			   Pkt
		   end;
	       _ ->
		   Pkt
	   end,
    {Pkt1, C2SState};
user_receive_packet(Acc) ->
    Acc.

-spec user_send_packet({stanza(), c2s_state()})
      -> {stanza(), c2s_state()}.
user_send_packet({#message{to = Peer} = Pkt, #{jid := JID} = C2SState}) ->
    LUser = JID#jid.luser,
    LServer = JID#jid.lserver,
    Pkt1 = init_stanza_id(Pkt, LServer),
    Pkt2 = case should_archive(Pkt1, LServer) of
	       true ->
		   case store_msg(xmpp:set_from_to(Pkt1, JID, Peer),
				  LUser, LServer, Peer, send) of
		       ok ->
			   mark_stored_msg(Pkt1, JID);
		       _ ->
			   Pkt1
		   end;
	       false ->
		   Pkt1
	   end,
    {Pkt2, C2SState};
user_send_packet(Acc) ->
    Acc.

-spec user_send_packet_strip_tag({stanza(), c2s_state()})
      -> {stanza(), c2s_state()}.
user_send_packet_strip_tag({#message{} = Pkt, #{jid := JID} = C2SState}) ->
    LServer = JID#jid.lserver,
    Pkt1 = xmpp:del_meta(Pkt, stanza_id),
    Pkt2 = strip_my_stanza_id(Pkt1, LServer),
    {Pkt2, C2SState};
user_send_packet_strip_tag(Acc) ->
    Acc.

-spec offline_message({any(), message()}) -> {any(), message()}.
offline_message({_Action, #message{from = Peer, to = To} = Pkt} = Acc) ->
    LUser = To#jid.luser,
    LServer = To#jid.lserver,
    case should_archive(Pkt, LServer) of
       true ->
	   case store_msg(Pkt, LUser, LServer, Peer, recv) of
	       ok ->
		   {archived, mark_stored_msg(Pkt, To)};
	       _ ->
		   Acc
	   end;
       false ->
	   Acc
    end.

-spec muc_filter_message(message(), mod_muc_room:state(),
			 binary()) -> message().
muc_filter_message(#message{from = From} = Pkt,
		   #state{config = Config, jid = RoomJID} = MUCState,
		   FromNick) ->
    LServer = RoomJID#jid.lserver,
    Pkt1 = init_stanza_id(Pkt, LServer),
    if Config#config.mam ->
	    StorePkt = strip_x_jid_tags(Pkt1),
	    case store_muc(MUCState, StorePkt, RoomJID, From, FromNick) of
		ok ->
		    mark_stored_msg(Pkt1, RoomJID);
		_ ->
		    Pkt1
	    end;
	true ->
	    Pkt1
    end;
muc_filter_message(Acc, _MUCState, _FromNick) ->
    Acc.

-spec make_id() -> integer().
make_id() ->
    erlang:system_time(microsecond).

-spec get_stanza_id(stanza()) -> integer().
get_stanza_id(#message{meta = #{stanza_id := ID}}) ->
    ID.

-spec init_stanza_id(stanza(), binary()) -> stanza().
init_stanza_id(#message{meta = #{stanza_id := _ID}} = Pkt, _LServer) ->
    Pkt;
init_stanza_id(#message{meta = #{from_offline := true}} = Pkt, _LServer) ->
    Pkt;
init_stanza_id(Pkt, LServer) ->
    ID = make_id(),
    Pkt1 = strip_my_stanza_id(Pkt, LServer),
    xmpp:put_meta(Pkt1, stanza_id, ID).

-spec set_stanza_id(stanza(), jid(), binary()) -> stanza().
set_stanza_id(Pkt, JID, ID) ->
    BareJID = jid:remove_resource(JID),
    Archived = #mam_archived{by = BareJID, id = ID},
    StanzaID = #stanza_id{by = BareJID, id = ID},
    NewEls = [Archived, StanzaID|xmpp:get_els(Pkt)],
    xmpp:set_els(Pkt, NewEls).

-spec mark_stored_msg(message(), jid()) -> message().
mark_stored_msg(#message{meta = #{stanza_id := ID}} = Pkt, JID) ->
    Pkt1 = set_stanza_id(Pkt, JID, integer_to_binary(ID)),
    xmpp:put_meta(Pkt1, mam_archived, true).

% Query archive v0.2
process_iq_v0_2(#iq{from = #jid{lserver = LServer},
		    to = #jid{lserver = LServer},
		    type = get, sub_els = [#mam_query{}]} = IQ) ->
    process_iq(LServer, IQ, chat);
process_iq_v0_2(IQ) ->
    process_iq(IQ).

% Query archive v0.3
process_iq_v0_3(#iq{from = #jid{lserver = LServer},
		    to = #jid{lserver = LServer},
		    type = set, sub_els = [#mam_query{}]} = IQ) ->
    process_iq(LServer, IQ, chat);
process_iq_v0_3(#iq{from = #jid{lserver = LServer},
		    to = #jid{lserver = LServer},
		    type = get, sub_els = [#mam_query{}]} = IQ) ->
    process_iq(LServer, IQ);
process_iq_v0_3(IQ) ->
    process_iq(IQ).

-spec muc_process_iq(ignore | iq(), mod_muc_room:state()) -> ignore | iq().
muc_process_iq(#iq{type = T, lang = Lang,
		   from = From,
		   sub_els = [#mam_query{xmlns = NS}]} = IQ,
	       MUCState)
  when (T == set andalso (NS /= ?NS_MAM_TMP)) orelse
       (T == get andalso NS == ?NS_MAM_TMP) ->
    case may_enter_room(From, MUCState) of
	true ->
	    LServer = MUCState#state.server_host,
	    Role = mod_muc_room:get_role(From, MUCState),
	    process_iq(LServer, IQ, {groupchat, Role, MUCState});
	false ->
	    Text = ?T("Only members may query archives of this room"),
	    xmpp:make_error(IQ, xmpp:err_forbidden(Text, Lang))
    end;
muc_process_iq(#iq{type = get,
		   sub_els = [#mam_query{xmlns = NS}]} = IQ,
	       MUCState) when NS /= ?NS_MAM_TMP ->
    LServer = MUCState#state.server_host,
    process_iq(LServer, IQ);
muc_process_iq(IQ, _MUCState) ->
    IQ.

parse_query(#mam_query{xmlns = ?NS_MAM_TMP,
		       start = Start, 'end' = End,
		       with = With, withtext = Text}, _Lang) ->
    {ok, [{start, Start}, {'end', End},
	  {with, With}, {withtext, Text}]};
parse_query(#mam_query{xdata = #xdata{}} = Query, Lang) ->
    X = xmpp_util:set_xdata_field(
	  #xdata_field{var = <<"FORM_TYPE">>,
		       type = hidden, values = [?NS_MAM_1]},
	  Query#mam_query.xdata),
    try	mam_query:decode(X#xdata.fields) of
	Form -> {ok, Form}
    catch _:{mam_query, Why} ->
	    Txt = mam_query:format_error(Why),
	    {error, xmpp:err_bad_request(Txt, Lang)}
    end;
parse_query(#mam_query{}, _Lang) ->
    {ok, []}.

disco_sm_features(empty, From, To, Node, Lang) ->
    disco_sm_features({result, []}, From, To, Node, Lang);
disco_sm_features({result, OtherFeatures},
		  #jid{luser = U, lserver = S},
		  #jid{luser = U, lserver = S}, <<"">>, _Lang) ->
    {result, [?NS_MAM_TMP, ?NS_MAM_0, ?NS_MAM_1, ?NS_MAM_2, ?NS_SID_0 |
	      OtherFeatures]};
disco_sm_features(Acc, _From, _To, _Node, _Lang) ->
    Acc.

-spec message_is_archived(boolean(), c2s_state(), message()) -> boolean().
message_is_archived(true, _C2SState, _Pkt) ->
    true;
message_is_archived(false, #{lserver := LServer}, Pkt) ->
    case mod_mam_opt:assume_mam_usage(LServer) of
	true ->
	    is_archived(Pkt, LServer);
	false ->
	    false
    end.

delete_old_messages_batch(Server, Type, Days, BatchSize, Rate) when Type == <<"chat">>;
								  Type == <<"groupchat">>;
								  Type == <<"all">> ->
    CurrentTime = make_id(),
    Diff = Days * 24 * 60 * 60 * 1000000,
    TimeStamp = misc:usec_to_now(CurrentTime - Diff),
    TypeA = misc:binary_to_atom(Type),
    LServer = jid:nameprep(Server),
    Mod = gen_mod:db_mod(LServer, ?MODULE),

    case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize, none},
				      fun({L, T, St, B, IS} = S) ->
					  case {erlang:function_exported(Mod, delete_old_messages_batch, 4),
						erlang:function_exported(Mod, delete_old_messages_batch, 5)} of
					      {true, _} ->
						  case Mod:delete_old_messages_batch(L, St, T, B) of
						      {ok, Count} ->
							  {ok, S, Count};
						      {error, _} = E ->
							  E
						  end;
					      {_, true} ->
						  case Mod:delete_old_messages_batch(L, St, T, B, IS) of
						      {ok, IS2, Count} ->
							  {ok, {L, St, T, B, IS2}, Count};
						      {error, _} = E ->
							  E
						  end;
					      _ ->
						  {error, not_implemented_for_backend}
					  end
				      end) of
	ok ->
	    {ok, ""};
	{error, in_progress} ->
	    {error, "Operation in progress"}
    end.
delete_old_messages_status(Server) ->
    LServer = jid:nameprep(Server),
    Msg = case ejabberd_batch:task_status({mam, LServer}) of
	not_started ->
	    "Operation not started";
	{failed, Steps, Error} ->
	    io_lib:format("Operation failed after deleting ~p messages with error ~p",
			  [Steps, misc:format_val(Error)]);
	{aborted, Steps} ->
	    io_lib:format("Operation was aborted after deleting ~p messages",
					[Steps]);
	      {working, Steps} ->
		  io_lib:format("Operation in progress, deleted ~p messages",
				[Steps]);
	{completed, Steps} ->
	    io_lib:format("Operation was completed after deleting ~p messages",
					[Steps])
    end,
    lists:flatten(Msg).

delete_old_messages_abort(Server) ->
    LServer = jid:nameprep(Server),
    case ejabberd_batch:abort_task({mam, LServer}) of
	aborted -> "Operation aborted";
	not_started -> "No task running"
    end.

delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
					TypeBin == <<"groupchat">>;
					TypeBin == <<"all">> ->
    CurrentTime = make_id(),
    Diff = Days * 24 * 60 * 60 * 1000000,
    TimeStamp = misc:usec_to_now(CurrentTime - Diff),
    Type = misc:binary_to_atom(TypeBin),
    DBTypes = lists:usort(
		lists:map(
		  fun(Host) ->
			  case mod_mam_opt:db_type(Host) of
			      sql -> {sql, Host};
			      Other -> {Other, global}
			  end
		  end, ejabberd_option:hosts())),
    Results = lists:map(
		fun({DBType, ServerHost}) ->
			Mod = gen_mod:db_mod(DBType, ?MODULE),
			Mod:delete_old_messages(ServerHost, TimeStamp, Type)
		end, DBTypes),
    case lists:filter(fun(Res) -> Res /= ok end, Results) of
	[] -> ok;
	[NotOk|_] -> NotOk
    end;
delete_old_messages(_TypeBin, _Days) ->
    unsupported_type.

export(LServer) ->
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    Mod:export(LServer).

-spec is_empty_for_user(binary(), binary()) -> boolean().
is_empty_for_user(User, Server) ->
    LUser = jid:nodeprep(User),
    LServer = jid:nameprep(Server),
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    Mod:is_empty_for_user(LUser, LServer).

-spec is_empty_for_room(binary(), binary(), binary()) -> boolean().
is_empty_for_room(LServer, Name, Host) ->
    LName = jid:nodeprep(Name),
    LHost = jid:nameprep(Host),
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    Mod:is_empty_for_room(LServer, LName, LHost).

-spec check_create_room(boolean(), binary(), binary(), binary()) -> boolean().
check_create_room(Acc, ServerHost, RoomID, Host) ->
    Acc and is_empty_for_room(ServerHost, RoomID, Host).

%%%===================================================================
%%% Internal functions
%%%===================================================================

process_iq(LServer, #iq{sub_els = [#mam_query{xmlns = NS}]} = IQ) ->
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    CommonFields = [{with, undefined},
		    {start, undefined},
		    {'end', undefined}],
    ExtendedFields = Mod:extended_fields(),
    Fields = mam_query:encode(CommonFields ++ ExtendedFields),
    X = xmpp_util:set_xdata_field(
	  #xdata_field{var = <<"FORM_TYPE">>, type = hidden, values = [NS]},
	  #xdata{type = form, fields = Fields}),
    xmpp:make_iq_result(IQ, #mam_query{xmlns = NS, xdata = X}).

% Preference setting (both v0.2 & v0.3)
process_iq(#iq{type = set, lang = Lang,
	       sub_els = [#mam_prefs{default = undefined, xmlns = NS}]} = IQ) ->
    Why = {missing_attr, <<"default">>, <<"prefs">>, NS},
    ErrTxt = xmpp:io_format_error(Why),
    xmpp:make_error(IQ, xmpp:err_bad_request(ErrTxt, Lang));
process_iq(#iq{from = #jid{luser = LUser, lserver = LServer},
	       to = #jid{lserver = LServer},
	       type = set, lang = Lang,
	       sub_els = [#mam_prefs{xmlns = NS,
				     default = Default,
				     always = Always0,
				     never = Never0}]} = IQ) ->
    Access = mod_mam_opt:access_preferences(LServer),
    case acl:match_rule(LServer, Access, jid:make(LUser, LServer)) of
	allow ->
	    Always = lists:usort(get_jids(Always0)),
	    Never = lists:usort(get_jids(Never0)),
	    case write_prefs(LUser, LServer, LServer, Default, Always, Never) of
		ok ->
		    NewPrefs = prefs_el(Default, Always, Never, NS),
		    xmpp:make_iq_result(IQ, NewPrefs);
		_Err ->
		    Txt = ?T("Database failure"),
		    xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
	    end;
	deny ->
	    Txt = ?T("MAM preference modification denied by service policy"),
	    xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang))
    end;
process_iq(#iq{from = #jid{luser = LUser, lserver = LServer},
	       to = #jid{lserver = LServer}, lang = Lang,
	       type = get, sub_els = [#mam_prefs{xmlns = NS}]} = IQ) ->
    case get_prefs(LUser, LServer) of
	{ok, Prefs} ->
	    PrefsEl = prefs_el(Prefs#archive_prefs.default,
			       Prefs#archive_prefs.always,
			       Prefs#archive_prefs.never,
			       NS),
	    xmpp:make_iq_result(IQ, PrefsEl);
	{error, _} ->
	    Txt = ?T("Database failure"),
	    xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
    end;
process_iq(IQ) ->
    xmpp:make_error(IQ, xmpp:err_not_allowed()).

process_iq(LServer, #iq{from = #jid{luser = LUser}, lang = Lang,
			sub_els = [SubEl]} = IQ, MsgType) ->
    Ret = case MsgType of
	      chat ->
		  maybe_activate_mam(LUser, LServer);
	      _ ->
		  ok
	  end,
    case Ret of
	ok ->
	    case SubEl of
		#mam_query{rsm = #rsm_set{index = I}} when is_integer(I) ->
		    Txt = ?T("Unsupported <index/> element"),
		    xmpp:make_error(IQ, xmpp:err_feature_not_implemented(Txt, Lang));
		#mam_query{rsm = RSM, flippage = FlipPage, xmlns = NS} ->
		    case parse_query(SubEl, Lang) of
			{ok, Query} ->
			    NewRSM = limit_max(RSM, NS),
			    select_and_send(LServer, Query, NewRSM, FlipPage, IQ, MsgType);
			{error, Err} ->
			    xmpp:make_error(IQ, Err)
		    end
	    end;
	{error, _} ->
	     Txt = ?T("Database failure"),
	    xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
    end.

-spec should_archive(message(), binary()) -> boolean().
should_archive(#message{type = error}, _LServer) ->
    false;
should_archive(#message{type = groupchat}, _LServer) ->
    false;
should_archive(#message{meta = #{from_offline := true}}, _LServer) ->
    false;
should_archive(#message{body = Body, subject = Subject,
			type = Type} = Pkt, LServer) ->
    case is_archived(Pkt, LServer) of
	true ->
	    false;
	false ->
	    case check_store_hint(Pkt) of
		store ->
		    true;
		no_store ->
		    false;
		none when Type == headline ->
		    false;
		none ->
		    case xmpp:get_text(Body) /= <<>> orelse
			 xmpp:get_text(Subject) /= <<>> of
			true ->
			    true;
			_ ->
			    case misc:unwrap_mucsub_message(Pkt) of
				#message{type = groupchat} = Msg ->
				    should_archive(Msg#message{type = chat}, LServer);
				#message{} = Msg ->
				    should_archive(Msg, LServer);
				_ ->
				    misc:is_mucsub_message(Pkt)
			    end
		    end
	    end
    end;
should_archive(_, _LServer) ->
    false.

-spec strip_my_stanza_id(stanza(), binary()) -> stanza().
strip_my_stanza_id(Pkt, LServer) ->
    Els = xmpp:get_els(Pkt),
    NewEls = lists:filter(
	       fun(El) ->
		       Name = xmpp:get_name(El),
		       NS = xmpp:get_ns(El),
		       if (Name == <<"archived">> andalso NS == ?NS_MAM_TMP);
			  (Name == <<"stanza-id">> andalso NS == ?NS_SID_0) ->
			       try xmpp:decode(El) of
				   #mam_archived{by = By} ->
				       By#jid.lserver /= LServer;
				   #stanza_id{by = By} ->
				       By#jid.lserver /= LServer
			       catch _:{xmpp_codec, _} ->
				       false
			       end;
			  true ->
			       true
		       end
	       end, Els),
    xmpp:set_els(Pkt, NewEls).

-spec strip_x_jid_tags(stanza()) -> stanza().
strip_x_jid_tags(Pkt) ->
    Els = xmpp:get_els(Pkt),
    NewEls = lists:filter(
	       fun(El) ->
		       case xmpp:get_name(El) of
			   <<"x">> ->
			       NS = xmpp:get_ns(El),
			       Items = if NS == ?NS_MUC_USER;
					  NS == ?NS_MUC_ADMIN;
					  NS == ?NS_MUC_OWNER ->
					       try xmpp:decode(El) of
						   #muc_user{items = Is} -> Is;
						   #muc_admin{items = Is} -> Is;
						   #muc_owner{items = Is} -> Is
					       catch _:{xmpp_codec, _} ->
						       []
					       end;
					  true ->
					       []
				       end,
			       not lists:any(
				     fun(#muc_item{jid = JID}) ->
					     JID /= undefined
				     end, Items);
			   _ ->
			       true
		       end
	       end, Els),
    xmpp:set_els(Pkt, NewEls).

-spec should_archive_peer(binary(), binary(),
			  #archive_prefs{}, jid()) -> boolean().
should_archive_peer(LUser, LServer,
		    #archive_prefs{default = Default,
				   always = Always,
				   never = Never},
		    Peer) ->
    LPeer = jid:remove_resource(jid:tolower(Peer)),
    case lists:member(LPeer, Always) of
	true ->
	    true;
	false ->
	    case lists:member(LPeer, Never) of
		true ->
		    false;
		false ->
		    case Default of
			always -> true;
			never -> false;
			roster ->
			    {Sub, _, _} = ejabberd_hooks:run_fold(
					    roster_get_jid_info,
					    LServer, {none, none, []},
					    [LUser, LServer, Peer]),
			    Sub == both orelse Sub == from orelse Sub == to
		    end
	    end
    end.

-spec should_archive_muc(message()) -> boolean().
should_archive_muc(#message{type = groupchat,
			    body = Body, subject = Subj} = Pkt) ->
    case check_store_hint(Pkt) of
	store ->
	    true;
	no_store ->
	    false;
	none ->
	    case xmpp:get_text(Body) of
		<<"">> ->
		    case xmpp:get_text(Subj) of
			<<"">> ->
			    false;
			_ ->
			    true
		    end;
		_ ->
		    true
	    end
    end;
should_archive_muc(_) ->
    false.

-spec check_store_hint(message()) -> store | no_store | none.
check_store_hint(Pkt) ->
    case has_store_hint(Pkt) of
	true ->
	    store;
	false ->
	    case has_no_store_hint(Pkt) of
		true ->
		    no_store;
		false ->
		    none
	    end
    end.

-spec has_store_hint(message()) -> boolean().
has_store_hint(Message) ->
    xmpp:has_subtag(Message, #hint{type = 'store'}).

-spec has_no_store_hint(message()) -> boolean().
has_no_store_hint(Message) ->
    xmpp:has_subtag(Message, #hint{type = 'no-store'}) orelse
    xmpp:has_subtag(Message, #hint{type = 'no-storage'}) orelse
    xmpp:has_subtag(Message, #hint{type = 'no-permanent-store'}) orelse
    xmpp:has_subtag(Message, #hint{type = 'no-permanent-storage'}).

-spec is_archived(message(), binary()) -> boolean().
is_archived(Pkt, LServer) ->
    case xmpp:get_subtag(Pkt, #stanza_id{by = #jid{}}) of
	#stanza_id{by = #jid{lserver = LServer}} ->
	    true;
	_ ->
	    false
    end.

-spec may_enter_room(jid(), mod_muc_room:state()) -> boolean().
may_enter_room(From,
	       #state{config = #config{members_only = false}} = MUCState) ->
    mod_muc_room:get_affiliation(From, MUCState) /= outcast;
may_enter_room(From, MUCState) ->
    mod_muc_room:is_occupant_or_admin(From, MUCState).

-spec store_msg(message(), binary(), binary(), jid(), send | recv)
      -> ok | pass | any().
store_msg(Pkt, LUser, LServer, Peer, Dir) ->
    case get_prefs(LUser, LServer) of
	{ok, Prefs} ->
	    UseMucArchive = mod_mam_opt:user_mucsub_from_muc_archive(LServer),
	    StoredInMucMam = UseMucArchive andalso xmpp:get_meta(Pkt, in_muc_mam, false),
	    case {should_archive_peer(LUser, LServer, Prefs, Peer), Pkt, StoredInMucMam} of
		{true, #message{meta = #{sm_copy := true}}, _} ->
		    ok; % Already stored.
		{true, _, true} ->
		    ok; % Stored in muc archive.
		{true, _, _} ->
		    case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt,
						 [LUser, LServer, Peer, <<"">>, chat, Dir]) of
			#message{} -> ok;
			_ -> pass
		    end;
		{false, _, _} ->
		    pass
	    end;
	{error, _} ->
	    pass
    end.

-spec store_muc(mod_muc_room:state(), message(), jid(), jid(), binary())
      -> ok | pass | any().
store_muc(MUCState, Pkt, RoomJID, Peer, Nick) ->
    case should_archive_muc(Pkt) of
	true ->
	    {U, S, _} = jid:tolower(RoomJID),
	    LServer = MUCState#state.server_host,
	    case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt,
					 [U, S, Peer, Nick, groupchat, recv]) of
		#message{} -> ok;
		_ -> pass
	    end;
	false ->
	    pass
    end.

store_mam_message(Pkt, U, S, Peer, Nick, Type, Dir) ->
    LServer = ejabberd_router:host_of_route(S),
    US = {U, S},
    ID = get_stanza_id(Pkt),
    El = xmpp:encode(Pkt),
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    Mod:store(El, LServer, US, Type, Peer, Nick, Dir, ID),
    Pkt.

write_prefs(LUser, LServer, Host, Default, Always, Never) ->
    Prefs = #archive_prefs{us = {LUser, LServer},
			   default = Default,
			   always = Always,
			   never = Never},
    Mod = gen_mod:db_mod(Host, ?MODULE),
    case Mod:write_prefs(LUser, LServer, Prefs, Host) of
	ok ->
	    case use_cache(Mod, LServer) of
		true ->
		    ets_cache:delete(archive_prefs_cache, {LUser, LServer},
				     cache_nodes(Mod, LServer));
		false ->
		    ok
	    end;
	_Err ->
	    {error, db_failure}
    end.

get_prefs(LUser, LServer) ->
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    Res = case use_cache(Mod, LServer) of
	      true ->
		  ets_cache:lookup(archive_prefs_cache, {LUser, LServer},
				   fun() -> Mod:get_prefs(LUser, LServer) end);
	      false ->
		  Mod:get_prefs(LUser, LServer)
	  end,
    case Res of
	{ok, Prefs} ->
	    {ok, Prefs};
	{error, _} ->
	    {error, db_failure};
	error ->
	    ActivateOpt = mod_mam_opt:request_activates_archiving(LServer),
	    case ActivateOpt of
		true ->
		    {ok, #archive_prefs{us = {LUser, LServer}, default = never}};
		false ->
		    Default = mod_mam_opt:default(LServer),
		    {ok, #archive_prefs{us = {LUser, LServer}, default = Default}}
	    end
    end.

prefs_el(Default, Always, Never, NS) ->
    #mam_prefs{default = Default,
	       always = [jid:make(LJ) || LJ <- Always],
	       never = [jid:make(LJ) || LJ <- Never],
	       xmlns = NS}.

maybe_activate_mam(LUser, LServer) ->
    ActivateOpt = mod_mam_opt:request_activates_archiving(LServer),
    case ActivateOpt of
	true ->
	    Mod = gen_mod:db_mod(LServer, ?MODULE),
	    Res = case use_cache(Mod, LServer) of
		      true ->
			  ets_cache:lookup(archive_prefs_cache,
					   {LUser, LServer},
					   fun() ->
						   Mod:get_prefs(LUser, LServer)
					   end);
		      false ->
			  Mod:get_prefs(LUser, LServer)
		  end,
	    case Res of
		{ok, _Prefs} ->
		    ok;
		{error, _} ->
		    {error, db_failure};
		error ->
		    Default = mod_mam_opt:default(LServer),
		    write_prefs(LUser, LServer, LServer, Default, [], [])
	    end;
	false ->
	    ok
    end.

select_and_send(LServer, Query, RSM, FlipPage, #iq{from = From, to = To} = IQ, MsgType) ->
    Ret = case MsgType of
	      chat ->
		  select(LServer, From, From, Query, RSM, MsgType);
	      _ ->
		  select(LServer, From, To, Query, RSM, MsgType)
	  end,
    case Ret of
	{Msgs, IsComplete, Count} ->
	    SortedMsgs = lists:keysort(2, Msgs),
	    SortedMsgs2 = case FlipPage of
	                       true -> lists:reverse(SortedMsgs);
	                       false -> SortedMsgs
	                  end,
	    send(SortedMsgs2, Count, IsComplete, IQ);
	{error, _} ->
	    Txt = ?T("Database failure"),
	    Err = xmpp:err_internal_server_error(Txt, IQ#iq.lang),
	    xmpp:make_error(IQ, Err)
    end.

select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) ->
    select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, all).

select(_LServer, JidRequestor, JidArchive, Query, RSM,
       {groupchat, _Role, #state{config = #config{mam = false},
				 history = History}} = MsgType, _Flags) ->
    Start = proplists:get_value(start, Query),
    End = proplists:get_value('end', Query),
    #lqueue{queue = Q} = History,
    L = p1_queue:len(Q),
    Msgs =
	lists:flatmap(
	  fun({Nick, Pkt, _HaveSubject, Now, _Size}) ->
		  TS = misc:now_to_usec(Now),
		  case match_interval(Now, Start, End) and
		      match_rsm(Now, RSM) of
		      true ->
			  case msg_to_el(#archive_msg{
					    id = integer_to_binary(TS),
					    type = groupchat,
					    timestamp = Now,
					    peer = undefined,
					    nick = Nick,
					    packet = Pkt},
					 MsgType, JidRequestor, JidArchive) of
			      {ok, Msg} ->
				  [{integer_to_binary(TS), TS, Msg}];
			      {error, _} ->
				  []
			  end;
		      false ->
			  []
		  end
	  end, p1_queue:to_list(Q)),
    case RSM of
	#rsm_set{max = Max, before = Before} when is_binary(Before) ->
	    {NewMsgs, IsComplete} = filter_by_max(lists:reverse(Msgs), Max),
	    {NewMsgs, IsComplete, L};
	#rsm_set{max = Max} ->
	    {NewMsgs, IsComplete} = filter_by_max(Msgs, Max),
	    {NewMsgs, IsComplete, L};
	_ ->
	    {Msgs, true, L}
    end;
select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) ->
    case might_expose_jid(Query, MsgType) of
	true ->
	    {[], true, 0};
	false ->
	    case {MsgType, mod_mam_opt:user_mucsub_from_muc_archive(LServer)} of
		{chat, true} ->
		    select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags);
		_ ->
		    db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags)
	    end
    end.

select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags) ->
    MucHosts = mod_muc_admin:find_hosts(LServer),
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    case proplists:get_value(with, Query) of
	#jid{lserver = WithLServer} = MucJid ->
	    case lists:member(WithLServer, MucHosts) of
		true ->
		    select(LServer, JidRequestor, MucJid, Query, RSM,
			   {groupchat, member, #state{config = #config{mam = true}}});
		_ ->
		    db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags)
	    end;
	_ ->
	    case erlang:function_exported(Mod, select_with_mucsub, 6) of
		true ->
		    Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags);
		false ->
		    select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags)
	    end
    end.

select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags) ->
    case db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags) of
	{error, _} = Err ->
	    Err;
	{Entries, All, Count} ->
	    {Dir, Max} = case RSM of
			     #rsm_set{max = M, before = V} when is_binary(V) ->
				 {desc, M};
			     #rsm_set{max = M} ->
				 {asc, M};
			     _ ->
				 {asc, undefined}
			 end,
	    SubRooms = case mod_muc_admin:find_hosts(LServer) of
			   [First|_] ->
			       case mod_muc:get_subscribed_rooms(First, JidRequestor) of
				   {ok, L} -> L;
				   {error, _} -> []
			       end;
			   _ ->
			       []
		       end,
	    SubRoomJids = [Jid || {Jid, _, _} <- SubRooms],
	    {E2, A2, C2} =
		lists:foldl(
		  fun(MucJid, {E0, A0, C0}) ->
			  case select(LServer, JidRequestor, MucJid, Query, RSM,
				      {groupchat, member, #state{config = #config{mam = true}}}) of
			      {error, _} ->
				  {E0, A0, C0};
			      {E, A, C} ->
				  {lists:keymerge(2, E0, wrap_as_mucsub(E, JidRequestor)),
				   A0 andalso A, C0 + C}
			  end
		  end, {Entries, All, Count}, SubRoomJids),
	    case {Dir, Max} of
		{_, undefined} ->
		    {E2, A2, C2};
		{desc, _} ->
		    Start = case length(E2) of
				Len when Len < Max -> 1;
				Len -> Len - Max + 1
			    end,
		    Sub = lists:sublist(E2, Start, Max),
		    {Sub, if Sub == E2 -> A2; true -> false end, C2};
		_ ->
		    Sub = lists:sublist(E2, 1, Max),
		    {Sub, if Sub == E2 -> A2; true -> false end, C2}
	    end
    end.

db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) ->
    Mod = gen_mod:db_mod(LServer, ?MODULE),
    case erlang:function_exported(Mod, select, 7) of
	true ->
	    Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags);
	_ ->
	Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType)
    end.

wrap_as_mucsub(Messages, #jid{lserver = LServer} = Requester) ->
    ReqBare = jid:remove_resource(Requester),
    ReqServer = jid:make(<<>>, LServer, <<>>),
    [{T1, T2, wrap_as_mucsub(M, ReqBare, ReqServer)} || {T1, T2, M} <- Messages].

wrap_as_mucsub(Message, Requester, ReqServer) ->
    case Message of
	#forwarded{delay = #delay{stamp = Stamp, desc = Desc},
		   sub_els = [#message{from = From, sub_els = SubEls, subject = Subject} = Msg]} ->
	    {L1, SubEls2} = case lists:keytake(mam_archived, 1, SubEls) of
				{value, Arch, Rest} ->
				    {[Arch#mam_archived{by = Requester}], Rest};
				_ ->
				    {[], SubEls}
			    end,
	    {Sid, L2, SubEls3} = case lists:keytake(stanza_id, 1, SubEls2) of
				{value, #stanza_id{id = Sid0} = SID, Rest2} ->
				    {Sid0, [SID#stanza_id{by = Requester} | L1], Rest2};
				_ ->
				    {p1_rand:get_string(), L1, SubEls2}
			    end,
	    Msg2 = Msg#message{to = Requester, sub_els = SubEls3},
	    Node = case Subject of
		       [] ->
			   ?NS_MUCSUB_NODES_MESSAGES;
		       _ ->
			   ?NS_MUCSUB_NODES_SUBJECT
		   end,
	    #forwarded{delay = #delay{stamp = Stamp, desc = Desc, from = ReqServer},
		       sub_els = [
			   #message{from = jid:remove_resource(From), to = Requester,
				    id = Sid,
				    sub_els = [#ps_event{
					items = #ps_items{
					    node = Node,
					    items = [#ps_item{
						id = Sid,
						sub_els = [Msg2]
					    }]}} | L2]}]};
	_ ->
	    Message
    end.


msg_to_el(#archive_msg{timestamp = TS, packet = El, nick = Nick,
		       peer = Peer, id = ID},
	  MsgType, JidRequestor, #jid{lserver = LServer} = JidArchive) ->
    CodecOpts = ejabberd_config:codec_options(),
    try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
	Pkt1 ->
	    Pkt2 = case MsgType of
		       chat -> set_stanza_id(Pkt1, JidArchive, ID);
		       {groupchat, _, _} -> set_stanza_id(Pkt1, JidArchive, ID);
		       _ -> Pkt1
		   end,
	    Pkt3 = maybe_update_from_to(
		     Pkt2, JidRequestor, JidArchive, Peer, MsgType, Nick),
	    Delay = #delay{stamp = TS, from = jid:make(LServer)},
	    {ok, #forwarded{sub_els = [Pkt3], delay = Delay}}
    catch _:{xmpp_codec, Why} ->
	    ?ERROR_MSG("Failed to decode raw element ~p from message "
		       "archive of user ~ts: ~ts",
		       [El, jid:encode(JidArchive), xmpp:format_error(Why)]),
	    {error, invalid_xml}
    end.

maybe_update_from_to(#message{sub_els = Els} = Pkt, JidRequestor, JidArchive,
		     Peer, {groupchat, Role,
			    #state{config = #config{anonymous = Anon}}},
		     Nick) ->
    ExposeJID = case {Peer, JidRequestor} of
		    {undefined, _JidRequestor} ->
			false;
		    {{U, S, _R}, #jid{luser = U, lserver = S}} ->
			true;
		    {_Peer, _JidRequestor} when not Anon; Role == moderator ->
			true;
		    {_Peer, _JidRequestor} ->
			false
		end,
    Items = case ExposeJID of
		true ->
		    [#muc_user{items = [#muc_item{jid = Peer}]}];
		false ->
		    []
	    end,
    Pkt#message{from = jid:replace_resource(JidArchive, Nick),
		to = undefined,
		sub_els = Items ++ Els};
maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, _MsgType, _Nick) ->
    Pkt.

-spec send([{binary(), integer(), xmlel()}],
	   count(), boolean(), iq()) -> iq() | ignore.
send(Msgs, Count, IsComplete,
     #iq{from = From, to = To,
	 sub_els = [#mam_query{id = QID, xmlns = NS}]} = IQ) ->
    Hint = #hint{type = 'no-store'},
    Els = lists:map(
	    fun({ID, _IDInt, El}) ->
		    #message{from = To,
			     to = From,
			     sub_els = [#mam_result{xmlns = NS,
						    id = ID,
						    queryid = QID,
						    sub_els = [El]}]}
	    end, Msgs),
    RSMOut = make_rsm_out(Msgs, Count),
    Result = if NS == ?NS_MAM_TMP ->
		     #mam_query{xmlns = NS, id = QID, rsm = RSMOut};
	        NS == ?NS_MAM_0 ->
		     #mam_fin{xmlns = NS, id = QID, rsm = RSMOut,
			      complete = IsComplete};
		true ->
		     #mam_fin{xmlns = NS, rsm = RSMOut, complete = IsComplete}
	     end,
    if NS /= ?NS_MAM_0 ->
	    lists:foreach(
	      fun(El) ->
		      ejabberd_router:route(El)
	      end, Els),
	    xmpp:make_iq_result(IQ, Result);
       true ->
	    ejabberd_router:route(xmpp:make_iq_result(IQ)),
	    lists:foreach(
	      fun(El) ->
		      ejabberd_router:route(El)
	      end, Els),
	    ejabberd_router:route(
	      #message{from = To, to = From, sub_els = [Result, Hint]}),
	    ignore
    end.

-spec make_rsm_out([{binary(), integer(), xmlel()}], count()) -> rsm_set().
make_rsm_out([], Count) ->
    #rsm_set{count = Count};
make_rsm_out([{FirstID, _, _}|_] = Msgs, Count) ->
    {LastID, _, _} = lists:last(Msgs),
    #rsm_set{first = #rsm_first{data = FirstID}, last = LastID, count = Count}.

filter_by_max(Msgs, undefined) ->
    {Msgs, true};
filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 ->
    {lists:sublist(Msgs, Len), length(Msgs) =< Len};
filter_by_max(_Msgs, _Junk) ->
    {[], true}.

-spec limit_max(rsm_set(), binary()) -> rsm_set() | undefined.
limit_max(RSM, ?NS_MAM_TMP) ->
    RSM; % XEP-0313 v0.2 doesn't require clients to support RSM.
limit_max(undefined, _NS) ->
    #rsm_set{max = ?DEF_PAGE_SIZE};
limit_max(#rsm_set{max = Max} = RSM, _NS) when not is_integer(Max) ->
    RSM#rsm_set{max = ?DEF_PAGE_SIZE};
limit_max(#rsm_set{max = Max} = RSM, _NS) when Max > ?MAX_PAGE_SIZE ->
    RSM#rsm_set{max = ?MAX_PAGE_SIZE};
limit_max(RSM, _NS) ->
    RSM.

match_interval(Now, Start, undefined) ->
    Now >= Start;
match_interval(Now, Start, End) ->
    (Now >= Start) and (Now =< End).

match_rsm(Now, #rsm_set{'after' = ID}) when is_binary(ID), ID /= <<"">> ->
    Now1 = (catch misc:usec_to_now(binary_to_integer(ID))),
    Now > Now1;
match_rsm(Now, #rsm_set{before = ID}) when is_binary(ID), ID /= <<"">> ->
    Now1 = (catch misc:usec_to_now(binary_to_integer(ID))),
    Now < Now1;
match_rsm(_Now, _) ->
    true.

might_expose_jid(Query,
		 {groupchat, Role, #state{config = #config{anonymous = true}}})
  when Role /= moderator ->
    proplists:is_defined(with, Query);
might_expose_jid(_Query, _MsgType) ->
    false.

get_jids(undefined) ->
    [];
get_jids(Js) ->
    [jid:tolower(jid:remove_resource(J)) || J <- Js].

get_commands_spec() ->
    [#ejabberd_commands{name = delete_old_mam_messages, tags = [purge],
			desc = "Delete MAM messages older than DAYS",
			longdesc = "Valid message TYPEs: "
				   "\"chat\", \"groupchat\", \"all\".",
			module = ?MODULE, function = delete_old_messages,
			args_desc = ["Type of messages to delete (chat, groupchat, all)",
                                     "Days to keep messages"],
			args_example = [<<"all">>, 31],
			args = [{type, binary}, {days, integer}],
			result = {res, rescode}},
     #ejabberd_commands{name = delete_old_mam_messages_batch, tags = [purge],
			desc = "Delete MAM messages older than DAYS",
			note = "added in 22.05",
			longdesc = "Valid message TYPEs: "
				   "\"chat\", \"groupchat\", \"all\".",
			module = ?MODULE, function = delete_old_messages_batch,
			args_desc = ["Name of host where messages should be deleted",
				     "Type of messages to delete (chat, groupchat, all)",
				     "Days to keep messages",
				     "Number of messages to delete per batch",
				     "Desired rate of messages to delete per minute"],
			args_example = [<<"localhost">>, <<"all">>, 31, 1000, 10000],
			args = [{host, binary}, {type, binary}, {days, integer}, {batch_size, integer}, {rate, integer}],
			result = {res, restuple},
			result_desc = "Result tuple",
			result_example = {ok, <<"Removal of 5000 messages in progress">>}},
     #ejabberd_commands{name = delete_old_mam_messages_status, tags = [purge],
			desc = "Status of delete old MAM messages operation",
			note = "added in 22.05",
			module = ?MODULE, function = delete_old_messages_status,
			args_desc = ["Name of host where messages should be deleted"],
			args_example = [<<"localhost">>],
			args = [{host, binary}],
			result = {status, string},
			result_desc = "Status test",
			result_example = "Operation in progress, delete 5000 messages"},
     #ejabberd_commands{name = abort_delete_old_mam_messages, tags = [purge],
			desc = "Abort currently running delete old MAM messages operation",
			note = "added in 22.05",
			module = ?MODULE, function = delete_old_messages_abort,
			args_desc = ["Name of host where operation should be aborted"],
			args_example = [<<"localhost">>],
			args = [{host, binary}],
			result = {status, string},
			result_desc = "Status text",
			result_example = "Operation aborted"},
     #ejabberd_commands{name = remove_mam_for_user, tags = [mam],
			desc = "Remove mam archive for user",
			module = ?MODULE, function = remove_mam_for_user,
			args = [{user, binary}, {host, binary}],
			args_rename = [{server, host}],
			args_desc = ["Username", "Server"],
			args_example = [<<"bob">>, <<"example.com">>],
			result = {res, restuple},
			result_desc = "Result tuple",
			result_example = {ok, <<"MAM archive removed">>}},
     #ejabberd_commands{name = remove_mam_for_user_with_peer, tags = [mam],
			desc = "Remove mam archive for user with peer",
			module = ?MODULE, function = remove_mam_for_user_with_peer,
			args = [{user, binary}, {host, binary}, {with, binary}],
			args_rename = [{server, host}],
			args_desc = ["Username", "Server", "Peer"],
			args_example = [<<"bob">>, <<"example.com">>, <<"anne@example.com">>],
			result = {res, restuple},
			result_desc = "Result tuple",
			result_example = {ok, <<"MAM archive removed">>}}
	].

mod_opt_type(compress_xml) ->
    econf:bool();
mod_opt_type(assume_mam_usage) ->
    econf:bool();
mod_opt_type(default) ->
    econf:enum([always, never, roster]);
mod_opt_type(request_activates_archiving) ->
    econf:bool();
mod_opt_type(clear_archive_on_room_destroy) ->
    econf:bool();
mod_opt_type(user_mucsub_from_muc_archive) ->
    econf:bool();
mod_opt_type(access_preferences) ->
    econf:acl();
mod_opt_type(db_type) ->
    econf:db_type(?MODULE);
mod_opt_type(use_cache) ->
    econf:bool();
mod_opt_type(cache_size) ->
    econf:pos_int(infinity);
mod_opt_type(cache_missed) ->
    econf:bool();
mod_opt_type(cache_life_time) ->
    econf:timeout(second, infinity).

mod_options(Host) ->
    [{assume_mam_usage, false},
     {default, never},
     {request_activates_archiving, false},
     {compress_xml, false},
     {clear_archive_on_room_destroy, true},
     {access_preferences, all},
     {user_mucsub_from_muc_archive, false},
     {db_type, ejabberd_config:default_db(Host, ?MODULE)},
     {use_cache, ejabberd_option:use_cache(Host)},
     {cache_size, ejabberd_option:cache_size(Host)},
     {cache_missed, ejabberd_option:cache_missed(Host)},
     {cache_life_time, ejabberd_option:cache_life_time(Host)}].

mod_doc() ->
    #{desc =>
          ?T("This module implements "
             "https://xmpp.org/extensions/xep-0313.html"
             "[XEP-0313: Message Archive Management]. "
             "Compatible XMPP clients can use it to store their "
             "chat history on the server."),
      opts =>
          [{access_preferences,
            #{value => ?T("AccessName"),
              desc =>
		  ?T("This access rule defines who is allowed to modify the "
		     "MAM preferences. The default value is 'all'.")}},
           {assume_mam_usage,
            #{value => "true | false",
              desc =>
                  ?T("This option determines how ejabberd's "
                     "stream management code (see _`mod_stream_mgmt`_) "
                     "handles unacknowledged messages when the "
                     "connection is lost. Usually, such messages are "
                     "either bounced or resent. However, neither is "
                     "done for messages that were stored in the user's "
                     "MAM archive if this option is set to 'true'. In "
                     "this case, ejabberd assumes those messages will "
                     "be retrieved from the archive. "
                     "The default value is 'false'.")}},
           {default,
            #{value => "always | never | roster",
              desc =>
                  ?T("The option defines default policy for chat history. "
                     "When 'always' is set every chat message is stored. "
                     "With 'roster' only chat history with contacts from "
                     "user's roster is stored. And 'never' fully disables "
                     "chat history. Note that a client can change its "
                     "policy via protocol commands. "
                     "The default value is 'never'.")}},
           {request_activates_archiving,
            #{value => "true | false",
              desc =>
                  ?T("If the value is 'true', no messages are stored "
                     "for a user until their client issue a MAM request, "
                     "regardless of the value of the 'default' option. "
                     "Once the server received a request, that user's "
                     "messages are archived as usual. "
                     "The default value is 'false'.")}},
           {compress_xml,
            #{value => "true | false",
              desc =>
                  ?T("When enabled, new messages added to archives are "
                     "compressed using a custom compression algorithm. "
                     "This feature works only with SQL backends. "
                     "The default value is 'false'.")}},
           {clear_archive_on_room_destroy,
            #{value => "true | false",
              desc =>
                  ?T("Whether to destroy message archive of a room "
                     "(see _`mod_muc`_) when it gets destroyed. "
                     "The default value is 'true'.")}},
           {db_type,
            #{value => "mnesia | sql",
              desc =>
                  ?T("Same as top-level _`default_db`_ option, but applied to this module only.")}},
           {use_cache,
            #{value => "true | false",
              desc =>
                  ?T("Same as top-level _`use_cache`_ option, but applied to this module only.")}},
           {cache_size,
            #{value => "pos_integer() | infinity",
              desc =>
                  ?T("Same as top-level _`cache_size`_ option, but applied to this module only.")}},
           {cache_missed,
            #{value => "true | false",
              desc =>
                  ?T("Same as top-level _`cache_missed`_ option, but applied to this module only.")}},
           {cache_life_time,
            #{value => "timeout()",
              desc =>
                  ?T("Same as top-level _`cache_life_time`_ option, but applied to this module only.")}},
           {user_mucsub_from_muc_archive,
            #{value => "true | false",
              desc =>
                  ?T("When this option is disabled, for each individual "
		     "subscriber a separa mucsub message is stored. With this "
		     "option enabled, when a user fetches archive virtual "
		     "mucsub, messages are generated from muc archives. "
		     "The default value is 'false'.")}}]}.
